Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sentry interceptor example #23

Merged
merged 13 commits into from
Dec 2, 2022
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [sentry](sentry) - Report errors to Sentry.

## Test

Expand Down
76 changes: 71 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ optional = true
temporalio = { version = "*", extras = ["opentelemetry"] }
opentelemetry-exporter-jaeger-thrift = "^1.13.0"

[tool.poetry.group.sentry]
optional = true
dependencies = { sentry-sdk = "^1.11.0" }

[tool.poe.tasks]
format = [{cmd = "black ."}, {cmd = "isort ."}]
lint = [{cmd = "black --check ."}, {cmd = "isort --check-only ."}, {ref = "lint-types" }]
Expand Down
19 changes: 19 additions & 0 deletions sentry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Sentry Sample

This sample shows how to configure [Sentry](https://sentry.io) to intercept and capture errors from the Temporal SDK.

For this sample, the optional `sentry` dependency group must be included. To include, run:

poetry install --with sentry

To run, first see [README.md](../README.md) for prerequisites. Set `SENTRY_DSN` environment variable to the Sentry DSN.
Then, run the following from this directory to start the worker:

poetry run python worker.py

This will start the worker. Then, in another terminal, run the following to execute the workflow:

poetry run python starter.py

The workflow should complete with the hello result. If you alter the workflow or the activity to raise an
`ApplicationError` instead, it should appear in Sentry.
Empty file added sentry/__init__.py
Empty file.
91 changes: 91 additions & 0 deletions sentry/interceptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from dataclasses import asdict, is_dataclass
from typing import Any, Optional, Type, Union

import sentry_sdk
from temporalio import activity, workflow
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
ExecuteWorkflowInput,
Interceptor,
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
)


def _set_common_workflow_tags(
info: Union[workflow.Info, activity.Info], scope: sentry_sdk.Scope
):
scope.set_tag("temporal.workflow.type", info.workflow_type)
scope.set_tag("temporal.workflow.id", info.workflow_id)


class _SentryActivityInboundInterceptor(ActivityInboundInterceptor):
async def execute_activity(self, input: ExecuteActivityInput) -> Any:
transaction_name = input.fn.__module__ + "." + input.fn.__qualname__
scope_ctx_manager = sentry_sdk.configure_scope()
with scope_ctx_manager as scope, sentry_sdk.start_transaction(
name=transaction_name
):
scope.set_tag("temporal.execution_type", "activity")
activity_info = activity.info()
_set_common_workflow_tags(activity_info, scope)
scope.set_tag("temporal.activity.id", activity_info.activity_id)
scope.set_tag("temporal.activity.type", activity_info.activity_type)
scope.set_tag("temporal.activity.task_queue", activity_info.task_queue)
scope.set_tag(
"temporal.workflow.namespace", activity_info.workflow_namespace
)
scope.set_tag("temporal.workflow.run_id", activity_info.workflow_run_id)
try:
return await super().execute_activity(input)
except Exception as e:
if len(input.args) == 1 and is_dataclass(input.args[0]):
scope.set_context("temporal.activity.input", asdict(input.args[0]))
scope.set_context("temporal.activity.info", activity.info().__dict__)
sentry_sdk.capture_exception(e)
raise e
finally:
scope.clear()


class _SentryWorkflowInterceptor(WorkflowInboundInterceptor):
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
transaction_name = input.run_fn.__module__ + "." + input.run_fn.__qualname__
scope_ctx_manager = sentry_sdk.configure_scope()
with scope_ctx_manager as scope, sentry_sdk.start_transaction(
name=transaction_name
):
scope.set_tag("temporal.execution_type", "workflow")
workflow_info = workflow.info()
_set_common_workflow_tags(workflow_info, scope)
scope.set_tag("temporal.workflow.task_queue", workflow_info.task_queue)
scope.set_tag("temporal.workflow.namespace", workflow_info.namespace)
scope.set_tag("temporal.workflow.run_id", workflow_info.run_id)
try:
return await super().execute_workflow(input)
except Exception as e:
if len(input.args) == 1 and is_dataclass(input.args[0]):
scope.set_context("temporal.workflow.input", asdict(input.args[0]))
scope.set_context("temporal.workflow.info", workflow.info().__dict__)
sentry_sdk.capture_exception(e)
raise e
finally:
scope.clear()


class SentryInterceptor(Interceptor):
nediamond marked this conversation as resolved.
Show resolved Hide resolved
"""Temporal Interceptor class which will report workflow & activity exceptions to Sentry"""

def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
"""Implementation of
:py:meth:`temporalio.worker.Interceptor.intercept_activity`.
"""
return _SentryActivityInboundInterceptor(super().intercept_activity(next))

def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
return _SentryWorkflowInterceptor
24 changes: 24 additions & 0 deletions sentry/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import asyncio
import os

from temporalio.client import Client

from sentry.worker import GreetingWorkflow


async def main():
# Connect client
client = await Client.connect("localhost:7233")

# Run workflow
result = await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="sentry-workflow-id",
task_queue="sentry-task-queue",
)
print(f"Workflow result: {result}")


if __name__ == "__main__":
asyncio.run(main())
64 changes: 64 additions & 0 deletions sentry/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import asyncio
import logging
import os
from dataclasses import dataclass
from datetime import timedelta

import sentry_sdk
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker

from sentry.interceptor import SentryInterceptor


@dataclass
class ComposeGreetingInput:
greeting: str
name: str


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"


@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)


async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)

# Initialize the Sentry SDK
sentry_sdk.init(
dsn=os.environ.get("SENTRY_DSN"),
)

# Start client
client = await Client.connect("localhost:7233")

# Run a worker for the workflow
worker = Worker(
client,
task_queue="sentry-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
interceptors=[SentryInterceptor()], # Use SentryInterceptor for error reporting
)

await worker.run()


if __name__ == "__main__":
asyncio.run(main())