Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[trace][feature] Add trace usage telemetry #3368

Merged
merged 12 commits into from
Jun 6, 2024
1 change: 1 addition & 0 deletions src/promptflow-devkit/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Improvements
- Add retry logic when uploading run details to cloud.
- Add trace usage telemetry.

## v1.11.0 (2024.05.17)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from promptflow._sdk._errors import MissingAzurePackage
from promptflow._sdk._tracing import _is_azure_ext_installed, process_otlp_trace_request
from promptflow._sdk._utilities.tracing_utils import _telemetry_helper as trace_telemetry_helper
from promptflow._sdk._utilities.tracing_utils import aggregate_trace_count


def trace_collector(
Expand All @@ -37,6 +39,7 @@ def trace_collector(
:param credential: The credential object used to authenticate with cosmosdb. Default is None.
:type credential: Optional[object]
"""
all_spans = list()
content_type = request.headers.get("Content-Type")
# binary protobuf encoding
if "application/x-protobuf" in content_type:
Expand All @@ -48,7 +51,7 @@ def trace_collector(
if credential is not None:
# local prompt flow service will not pass credential, so this is runtime scenario
get_credential = credential if callable(credential) else lambda: credential # noqa: F841
process_otlp_trace_request(
all_spans = process_otlp_trace_request(
trace_request=trace_request,
get_created_by_info_with_cache=get_created_by_info_with_cache,
logger=logger,
Expand All @@ -62,13 +65,18 @@ def trace_collector(
from azure.identity import AzureCliCredential

get_credential = AzureCliCredential
process_otlp_trace_request(
all_spans = process_otlp_trace_request(
trace_request=trace_request,
get_created_by_info_with_cache=get_created_by_info_with_cache,
logger=logger,
get_credential=get_credential,
cloud_trace_only=cloud_trace_only,
)
# trace telemetry
if len(all_spans) > 0:
summary = aggregate_trace_count(all_spans=all_spans)
trace_telemetry_helper.append(summary=summary)

return "Traces received", 200

# JSON protobuf encoding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

import copy
import datetime
import json
import logging
import multiprocessing
import threading
import time
import typing
from collections import namedtuple
from dataclasses import dataclass
Expand All @@ -26,6 +30,7 @@
SpanStatusFieldName,
)
from promptflow._sdk._constants import HOME_PROMPT_FLOW_DIR, AzureMLWorkspaceTriad
from promptflow._sdk._telemetry.telemetry import get_telemetry_logger
from promptflow._sdk.entities._trace import Span
from promptflow._utils.logger_utils import get_cli_sdk_logger
from promptflow.core._errors import MissingRequiredPackage
Expand Down Expand Up @@ -330,3 +335,41 @@ def aggregate_trace_count(all_spans: typing.List[Span]) -> typing.Dict[TraceCoun
trace_count_summary[key] = trace_count_summary.get(key, 0) + 1

return trace_count_summary


class TraceTelemetryHelper:
"""Helper class for trace telemetry in prompt flow service."""

LOG_INTERVAL_SECONDS = 60
TELEMETRY_ACTIVITY_NAME = "pf.telemetry.trace_count"
CUSTOM_DIMENSIONS_TRACE_COUNT = "trace_count"

def __init__(self):
self._telemetry_logger = get_telemetry_logger()
self._lock = multiprocessing.Lock()
self._summary: typing.Dict[TraceCountKey, int] = dict()
self._thread = threading.Thread(target=self._schedule_flush, daemon=True)
self._thread.start()

def _schedule_flush(self) -> None:
while True:
time.sleep(self.LOG_INTERVAL_SECONDS)
self.log_telemetry()

def append(self, summary: typing.Dict[TraceCountKey, int]) -> None:
with self._lock:
for key, count in summary.items():
self._summary[key] = self._summary.get(key, 0) + count

def log_telemetry(self) -> None:
# only lock the process to operate the summary
with self._lock:
zhengfeiwang marked this conversation as resolved.
Show resolved Hide resolved
summary_to_log = copy.deepcopy(self._summary)
self._summary = dict()
for key, count in summary_to_log.items():
custom_dimensions = key._asdict()
custom_dimensions[self.CUSTOM_DIMENSIONS_TRACE_COUNT] = count
self._telemetry_logger.info(self.TELEMETRY_ACTIVITY_NAME, extra={"custom_dimensions": custom_dimensions})


_telemetry_helper = TraceTelemetryHelper()
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ def __init__(
self.variable_recorder = variable_recorder

@staticmethod
def from_test_case(
test_class, test_func_name: str, recording_dir: str, **kwargs
) -> "PFAzureIntegrationTestRecording":
def from_test_case(test_class, test_func_name: str, **kwargs) -> "PFAzureIntegrationTestRecording":
test_class_name = test_class.__name__
if test_class_name in TEST_CLASSES_FOR_RUN_INTEGRATION_TEST_RECORDING:
return PFAzureRunIntegrationTestRecording(
Expand All @@ -81,7 +79,7 @@ def from_test_case(
user_object_id=kwargs["user_object_id"],
tenant_id=kwargs["tenant_id"],
variable_recorder=kwargs["variable_recorder"],
recording_dir=recording_dir,
recording_dir=kwargs.get("recording_dir"),
)

def _get_recording_file(self) -> Path:
Expand Down
1 change: 1 addition & 0 deletions src/promptflow/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

### Improvements
- [promptflow-devkit] Add retry logic when uploading run details to cloud.
- [promptflow-devkit] Add trace usage telemetry.

## v1.11.0 (2024.05.17)

Expand Down
Loading