Skip to content

Commit

Permalink
revert(prometheus): generic span metrics DEVX-637 DEVX-641 (#679)
Browse files Browse the repository at this point in the history
  • Loading branch information
nsheaps committed Jun 7, 2022
1 parent 94b2fc3 commit c47b5f2
Show file tree
Hide file tree
Showing 14 changed files with 59 additions and 123 deletions.
5 changes: 1 addition & 4 deletions baseplate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,6 @@ def __init__(
name: str,
context: RequestContext,
baseplate: Optional[Baseplate] = None,
component_name: Optional[str] = None,
):
self.trace_id = trace_id
self.parent_id = parent_id
Expand All @@ -570,7 +569,6 @@ def __init__(
self.baseplate = baseplate
self.component_name: Optional[str] = None
self.observers: List[SpanObserver] = []
self.component_name = component_name

def register(self, observer: SpanObserver) -> None:
"""Register an observer to receive events from this span."""
Expand Down Expand Up @@ -732,8 +730,8 @@ def make_child(
name,
context_copy,
self.baseplate,
component_name,
)
span.component_name = component_name
else:
span = Span(
self.trace_id,
Expand All @@ -744,7 +742,6 @@ def make_child(
name,
context_copy,
self.baseplate,
component_name,
)
context_copy.span = span

Expand Down
4 changes: 2 additions & 2 deletions baseplate/clients/cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def execute_async(
**kwargs: Any,
) -> ResponseFuture:
trace_name = f"{self.context_name}.execute"
span = self.server_span.make_child(trace_name, component_name="cassandra_client")
span = self.server_span.make_child(trace_name)
span.start()
# TODO: include custom payload
if isinstance(query, str):
Expand Down Expand Up @@ -322,7 +322,7 @@ def prepare(self, query: str, cache: bool = True) -> PreparedStatement:
pass

trace_name = f"{self.context_name}.prepare"
with self.server_span.make_child(trace_name, component_name="cassandra_client") as span:
with self.server_span.make_child(trace_name) as span:
span.set_tag("statement", query)
prepared = self.session.prepare(query)
if cache:
Expand Down
2 changes: 1 addition & 1 deletion baseplate/clients/kombu.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def publish(self, body: Any, routing_key: Optional[str] = None, **kwargs: Any) -
kwargs.setdefault("serializer", self.serializer.name)

trace_name = f"{self.name}.publish"
child_span = self.span.make_child(trace_name, component_name="kombu_client")
child_span = self.span.make_child(trace_name)

child_span.set_tag("kind", "producer")
if routing_key:
Expand Down
2 changes: 1 addition & 1 deletion baseplate/clients/memcache/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def _make_span(self, method_name: str) -> Span:
"""
trace_name = f"{self.context_name}.{method_name}"
span = self.server_span.make_child(trace_name, component_name="memcache_client")
span = self.server_span.make_child(trace_name)
span.set_tag("method", method_name)
return span

Expand Down
4 changes: 2 additions & 2 deletions baseplate/clients/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def execute_command(self, *args: Any, **kwargs: Any) -> Any:
command = args[0]
trace_name = f"{self.context_name}.{command}"

with self.server_span.make_child(trace_name, component_name="redis_client"):
with self.server_span.make_child(trace_name):
return super().execute_command(command, *args[1:], **kwargs)

# pylint: disable=arguments-differ
Expand Down Expand Up @@ -211,7 +211,7 @@ def __init__(

# pylint: disable=arguments-differ
def execute(self, **kwargs: Any) -> Any:
with self.server_span.make_child(self.trace_name, component_name="redis_pipeline_client"):
with self.server_span.make_child(self.trace_name):
return super().execute(**kwargs)


Expand Down
4 changes: 2 additions & 2 deletions baseplate/clients/redis_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def execute_command(self, *args: Any, **kwargs: Any) -> Any:
command = args[0]
trace_name = f"{self.context_name}.{command}"

with self.server_span.make_child(trace_name, component_name="rediscluster_client"):
with self.server_span.make_child(trace_name):
res = super().execute_command(command, *args[1:], **kwargs)

self.hot_key_tracker.maybe_track_key_usage(list(args))
Expand Down Expand Up @@ -487,5 +487,5 @@ def execute_command(self, *args: Any, **kwargs: Any) -> Any:

# pylint: disable=arguments-differ
def execute(self, **kwargs: Any) -> Any:
with self.server_span.make_child(self.trace_name, component_name="rediscluster_client"):
with self.server_span.make_child(self.trace_name):
return super().execute(**kwargs)
4 changes: 1 addition & 3 deletions baseplate/clients/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,7 @@ def send(self, request: PreparedRequest, **kwargs: Any) -> Response:
"http.url": request.url,
"http.slug": self.client_name if self.client_name is not None else self.name,
}
with self.span.make_child(f"{self.name}.request", component_name="http_client").with_tags(
tags
) as span:
with self.span.make_child(f"{self.name}.request").with_tags(tags) as span:
self._add_span_context(span, request)

# we cannot re-use the same session every time because sessions re-use the same
Expand Down
2 changes: 1 addition & 1 deletion baseplate/clients/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def on_before_execute(
server_span = conn._execution_options["server_span"]

trace_name = f"{context_name}.execute"
span = server_span.make_child(trace_name, component_name="sql_client")
span = server_span.make_child(trace_name)
span.set_tag("statement", statement[:1021] + "..." if len(statement) > 1024 else statement)
span.start()

Expand Down
2 changes: 1 addition & 1 deletion baseplate/clients/thrift.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def _call_thrift_method(self: Any, *args: Any, **kwargs: Any) -> Any:
for time_remaining in self.retry_policy:
try:
with self.pool.connection() as prot:
span = self.server_span.make_child(trace_name, component_name="thrift_client")
span = self.server_span.make_child(trace_name)
span.set_tag("slug", self.namespace)

client = self.client_cls(prot)
Expand Down
6 changes: 3 additions & 3 deletions baseplate/frameworks/queue_consumer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def run(self) -> None:
while not self.stopped:
context = self.baseplate.make_context_object()
with self.baseplate.make_server_span(context, f"{self.name}.pump") as span:
with span.make_child("kafka.consume", component_name="kafka_client"):
with span.make_child("kafka.consume"):
messages = self.consumer.consume(num_messages=self.batch_size, timeout=0)

if not messages:
Expand All @@ -77,7 +77,7 @@ def run(self) -> None:
time.sleep(1)
continue

with span.make_child("kafka.work_queue_put", component_name="kafka_client"):
with span.make_child("kafka.work_queue_put"):
for message in messages:
self.work_queue.put(message)

Expand Down Expand Up @@ -399,7 +399,7 @@ def commit_offset(
message.partition(),
message.offset(),
)
with context.span.make_child("kafka.commit", component_name="kafka_client"):
with context.span.make_child("kafka.commit"):
self.consumer.commit(message=message, asynchronous=False)

return KafkaMessageHandler(
Expand Down
50 changes: 2 additions & 48 deletions baseplate/lib/prometheus_metrics.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import logging
import re

from typing import Any
from typing import Dict

from prometheus_client import Counter
from prometheus_client import Gauge
from prometheus_client import Histogram

logger = logging.getLogger(__name__)

# default_latency_buckets creates the default bucket values for time based histogram metrics.
# we want this to match the baseplate.go default_buckets, ref: https://github.com/reddit/baseplate.go/blob/master/prometheusbp/metrics.go.
Expand All @@ -30,8 +26,6 @@
default_size_start * default_size_factor ** i for i in range(default_size_count)
]

generic_metrics = {}


class PrometheusHTTPClientMetrics:
prefix = "http_client"
Expand Down Expand Up @@ -383,8 +377,8 @@ def getHTTPSuccessLabel(httpStatusCode: int) -> str:
return str(200 <= httpStatusCode < 400).lower()


class PrometheusGenericSpanMetrics:
prefix = "generic"
class PrometheusLocalSpanMetrics:
prefix = "local_span"

# local labels and metrics
labels = [
Expand Down Expand Up @@ -444,43 +438,3 @@ def get_requests_total_metric(cls) -> Counter:
@classmethod
def get_active_requests_metric(cls) -> Gauge:
return cls.active_requests


def get_metrics_for_prefix(prefix: str) -> PrometheusGenericSpanMetrics:
# make sure metric names don't include disallowed chars
# https://github.com/prometheus/client_python/blob/748ffb00600dc25fbd22d37d549578e8e370d996/prometheus_client/metrics_core.py#L10
prefix = prefix.replace(".", "_")
prefix = re.sub("[^0-9a-zA-Z_:]+", "", prefix)

if prefix not in generic_metrics:
# local labels and metrics
labels = [
"span",
]
generic_metrics[prefix] = type(
f"PrometheusGenericSpanMetrics<{prefix}>",
(PrometheusGenericSpanMetrics,),
{
"prefix": prefix,
# Reset the class attributes to avoid having the same prefix in the metric names
"labels": labels,
"latency_seconds": Histogram(
f"{prefix}_latency_seconds",
f"Latency histogram of {prefix} span",
labels,
buckets=default_latency_buckets,
),
"requests_total": Counter(
f"{prefix}_requests_total",
f"Total number of {prefix} spans started",
labels,
),
"active_requests": Gauge(
f"{prefix}_active_requests",
f"Number of active {prefix} spans",
labels,
),
},
)
logger.debug("Created new metrics class for prefix %s", prefix)
return generic_metrics[prefix] # type: ignore

0 comments on commit c47b5f2

Please sign in to comment.