Skip to content

Commit

Permalink
Sentry interceptor example (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
nediamond committed Dec 2, 2022
1 parent 8407dca commit 8e753f4
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [sentry](sentry) - Report errors to Sentry.

## Test

Expand Down
76 changes: 71 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ optional = true
temporalio = { version = "*", extras = ["opentelemetry"] }
opentelemetry-exporter-jaeger-thrift = "^1.13.0"

[tool.poetry.group.sentry]
optional = true
dependencies = { sentry-sdk = "^1.11.0" }

[tool.poe.tasks]
format = [{cmd = "black ."}, {cmd = "isort ."}]
lint = [{cmd = "black --check ."}, {cmd = "isort --check-only ."}, {ref = "lint-types" }]
Expand Down
19 changes: 19 additions & 0 deletions sentry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Sentry Sample

This sample shows how to configure [Sentry](https://sentry.io) to intercept and capture errors from the Temporal SDK.

For this sample, the optional `sentry` dependency group must be included. To include, run:

poetry install --with sentry

To run, first see [README.md](../README.md) for prerequisites. Set `SENTRY_DSN` environment variable to the Sentry DSN.
Then, run the following from this directory to start the worker:

poetry run python worker.py

This will start the worker. Then, in another terminal, run the following to execute the workflow:

poetry run python starter.py

The workflow should complete with the hello result. If you alter the workflow or the activity to raise an
`ApplicationError` instead, it should appear in Sentry.
Empty file added sentry/__init__.py
Empty file.
91 changes: 91 additions & 0 deletions sentry/interceptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from dataclasses import asdict, is_dataclass
from typing import Any, Optional, Type, Union

import sentry_sdk
from temporalio import activity, workflow
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
ExecuteWorkflowInput,
Interceptor,
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
)


def _set_common_workflow_tags(
info: Union[workflow.Info, activity.Info], scope: sentry_sdk.Scope
):
scope.set_tag("temporal.workflow.type", info.workflow_type)
scope.set_tag("temporal.workflow.id", info.workflow_id)


class _SentryActivityInboundInterceptor(ActivityInboundInterceptor):
async def execute_activity(self, input: ExecuteActivityInput) -> Any:
transaction_name = input.fn.__module__ + "." + input.fn.__qualname__
scope_ctx_manager = sentry_sdk.configure_scope()
with scope_ctx_manager as scope, sentry_sdk.start_transaction(
name=transaction_name
):
scope.set_tag("temporal.execution_type", "activity")
activity_info = activity.info()
_set_common_workflow_tags(activity_info, scope)
scope.set_tag("temporal.activity.id", activity_info.activity_id)
scope.set_tag("temporal.activity.type", activity_info.activity_type)
scope.set_tag("temporal.activity.task_queue", activity_info.task_queue)
scope.set_tag(
"temporal.workflow.namespace", activity_info.workflow_namespace
)
scope.set_tag("temporal.workflow.run_id", activity_info.workflow_run_id)
try:
return await super().execute_activity(input)
except Exception as e:
if len(input.args) == 1 and is_dataclass(input.args[0]):
scope.set_context("temporal.activity.input", asdict(input.args[0]))
scope.set_context("temporal.activity.info", activity.info().__dict__)
sentry_sdk.capture_exception(e)
raise e
finally:
scope.clear()


class _SentryWorkflowInterceptor(WorkflowInboundInterceptor):
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
transaction_name = input.run_fn.__module__ + "." + input.run_fn.__qualname__
scope_ctx_manager = sentry_sdk.configure_scope()
with scope_ctx_manager as scope, sentry_sdk.start_transaction(
name=transaction_name
):
scope.set_tag("temporal.execution_type", "workflow")
workflow_info = workflow.info()
_set_common_workflow_tags(workflow_info, scope)
scope.set_tag("temporal.workflow.task_queue", workflow_info.task_queue)
scope.set_tag("temporal.workflow.namespace", workflow_info.namespace)
scope.set_tag("temporal.workflow.run_id", workflow_info.run_id)
try:
return await super().execute_workflow(input)
except Exception as e:
if len(input.args) == 1 and is_dataclass(input.args[0]):
scope.set_context("temporal.workflow.input", asdict(input.args[0]))
scope.set_context("temporal.workflow.info", workflow.info().__dict__)
sentry_sdk.capture_exception(e)
raise e
finally:
scope.clear()


class SentryInterceptor(Interceptor):
"""Temporal Interceptor class which will report workflow & activity exceptions to Sentry"""

def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
"""Implementation of
:py:meth:`temporalio.worker.Interceptor.intercept_activity`.
"""
return _SentryActivityInboundInterceptor(super().intercept_activity(next))

def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
return _SentryWorkflowInterceptor
24 changes: 24 additions & 0 deletions sentry/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import asyncio
import os

from temporalio.client import Client

from sentry.worker import GreetingWorkflow


async def main():
# Connect client
client = await Client.connect("localhost:7233")

# Run workflow
result = await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="sentry-workflow-id",
task_queue="sentry-task-queue",
)
print(f"Workflow result: {result}")


if __name__ == "__main__":
asyncio.run(main())
64 changes: 64 additions & 0 deletions sentry/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import asyncio
import logging
import os
from dataclasses import dataclass
from datetime import timedelta

import sentry_sdk
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker

from sentry.interceptor import SentryInterceptor


@dataclass
class ComposeGreetingInput:
greeting: str
name: str


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"


@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)


async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)

# Initialize the Sentry SDK
sentry_sdk.init(
dsn=os.environ.get("SENTRY_DSN"),
)

# Start client
client = await Client.connect("localhost:7233")

# Run a worker for the workflow
worker = Worker(
client,
task_queue="sentry-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
interceptors=[SentryInterceptor()], # Use SentryInterceptor for error reporting
)

await worker.run()


if __name__ == "__main__":
asyncio.run(main())

0 comments on commit 8e753f4

Please sign in to comment.