Skip to content

Commit

Permalink
Added OTEL and prometheus connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
northpowered committed Apr 10, 2023
1 parent 2ffde28 commit 26879ce
Show file tree
Hide file tree
Showing 10 changed files with 376 additions and 6 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
max-line-length = 130
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ FROM python:3.10-slim

LABEL org.opencontainers.image.source="https://github.com/northpowered/temporal-rest-executor"

LABEL version="0.1.0"
LABEL version="0.2.0"

ENV POETRY_VIRTUALENVS_CREATE=false

Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ Some env vars:

**UVICORN_BIND_PORT = 8000**

**TELEMETRY_ENABLED = True**

**TELEMETRY_AGENT_HOST = localhost**

**TELEMETRY_AGENT_PORT = 6831**

**PROMETHEUS_ENDPOINT_ENABLED = True**

**PROMETHEUS_ENDPOINT_PORT = 9000**

## Use

Default FastAPI Swagger is available
Expand Down
241 changes: 240 additions & 1 deletion poetry.lock

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "temporal-rest-executor"
version = "0.1.0"
version = "0.2.0"
description = ""
authors = ["Your Name <you@example.com>"]
readme = "README.md"
Expand All @@ -25,6 +25,16 @@ pytest-cov = "^4.0.0"
black = "^23.3.0"
flake8 = "^6.0.0"


[tool.poetry.group.otel.dependencies]
temporalio = {version = "^1.1.0", extras = ["opentelementy"]}
opentelemetry-exporter-jaeger-thrift = "^1.17.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.pytest.ini_options]
filterwarnings = [
'ignore::DeprecationWarning',
]
21 changes: 21 additions & 0 deletions src/executor/env.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from os import getenv

SERVICE_NAME: str = getenv("SERVICE_NAME", "Temporal REST executor")

UVICORN_RELOAD: bool = getenv("UVICORN_RELOAD", False)

Expand All @@ -18,3 +19,23 @@
TEMPORAL_INTERNAL_TASK_QUEUE: str = getenv(
"TEMPORAL_INTERNAL_TASK_QUEUE", "internal-execution-queue"
)

TELEMETRY_ENABLED: bool = getenv(
"TELEMETRY_ENABLED", True
)

TELEMETRY_AGENT_HOST: str = getenv(
"TELEMETRY_AGENT_HOST", "localhost"
)

TELEMETRY_AGENT_PORT: str = getenv(
"TELEMETRY_AGENT_PORT", "6831"
)

PROMETHEUS_ENDPOINT_ENABLED: bool = getenv(
"PROMETHEUS_ENDPOINT_ENABLED", True
)

PROMETHEUS_ENDPOINT_PORT: str = getenv(
"PROMETHEUS_ENDPOINT_PORT", "9000"
)
25 changes: 22 additions & 3 deletions src/executor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,38 @@
TEMPORAL_INTERNAL_TASK_QUEUE,
TEMPORAL_ENDPOINT,
TEMPORAL_INTERNAL_FLOW_NAME,
TEMPORAL_NAMESPACE
TEMPORAL_NAMESPACE,
SERVICE_NAME,
TELEMETRY_ENABLED,
TELEMETRY_AGENT_HOST,
TELEMETRY_AGENT_PORT,
PROMETHEUS_ENDPOINT_ENABLED,
PROMETHEUS_ENDPOINT_PORT
)
from contextlib import asynccontextmanager
from .router import execution_router
from rich.console import Console


__title__: str = "Temporal REST executor"
__version__: str = "0.1.0"
__title__: str = SERVICE_NAME
__version__: str = "0.2.0"


console = Console()


@asynccontextmanager
async def lifespan(app: FastAPI): # pragma: no cover
tracing_str: str = "[red bold]disabled[/ red bold]"

if TELEMETRY_ENABLED:
tracing_str = f"[blue bold]enabled[/ blue bold] on [blue bold]{TELEMETRY_AGENT_HOST}:{TELEMETRY_AGENT_PORT}[/ blue bold]"

prometheus_str: str = "[red bold]disabled[/ red bold]"

if PROMETHEUS_ENDPOINT_ENABLED:
prometheus_str = f"[blue bold]enabled[/ blue bold] on [blue bold]127.0.0.1:{PROMETHEUS_ENDPOINT_PORT}[/ blue bold]"

console.print(f"""
{__title__}:{__version__} loaded for this Temporal instance:
Temporal endpoint = [blue bold]{TEMPORAL_ENDPOINT}[/ blue bold]
Expand All @@ -27,6 +43,9 @@ async def lifespan(app: FastAPI): # pragma: no cover
Internal Executor workflow name is = [blue bold]{TEMPORAL_INTERNAL_FLOW_NAME}[/ blue bold]
And will be registred in = [blue bold]{TEMPORAL_INTERNAL_TASK_QUEUE}[/ blue bold] queue
OTEL tracing {tracing_str}
Prometheus endpoint {prometheus_str}
https://github.com/northpowered/temporal-rest-executor
Created with [red]:heart:[/ red] and by [blue bold]northpowered[/ blue bold]
Expand Down
46 changes: 46 additions & 0 deletions src/executor/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@

from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry import trace
from .env import (
TELEMETRY_AGENT_HOST,
TELEMETRY_AGENT_PORT,
SERVICE_NAME as SN,
PROMETHEUS_ENDPOINT_PORT,
TELEMETRY_ENABLED,
PROMETHEUS_ENDPOINT_ENABLED
)


def init_runtime_with_telemetry() -> Runtime:

# Setup global tracer for workflow traces
if TELEMETRY_ENABLED:
provider = TracerProvider(resource=Resource.create({SERVICE_NAME: SN}))
provider.add_span_processor(
BatchSpanProcessor(
JaegerExporter(
agent_host_name=TELEMETRY_AGENT_HOST,
agent_port=int(TELEMETRY_AGENT_PORT),
)
)
)
trace.set_tracer_provider(provider)

# Metrics setup
metrics_config: PrometheusConfig | None = None
if PROMETHEUS_ENDPOINT_ENABLED:
metrics_config = PrometheusConfig(
bind_address=f"127.0.0.1:{PROMETHEUS_ENDPOINT_PORT}"
)

return Runtime(telemetry=TelemetryConfig(
metrics=metrics_config
)
)


runtime = init_runtime_with_telemetry()
8 changes: 8 additions & 0 deletions src/executor/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@
)
from datetime import timedelta
from uuid import uuid4
import opentelemetry.context
from temporalio.contrib.opentelemetry import TracingInterceptor
from .telemetry import runtime


async def temporal_client(): # pragma: no cover

opentelemetry.context.get_current()

return await Client.connect(
TEMPORAL_ENDPOINT,
namespace=TEMPORAL_NAMESPACE,
interceptors=[TracingInterceptor()],
runtime=runtime,
)


Expand Down
15 changes: 15 additions & 0 deletions src/misc/otel-metrics-collector-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
receivers:
otlp:
protocols:
grpc:
exporters:
logging:
loglevel: debug
processors:
batch:
service:
pipelines:
metrics:
receivers: [otlp]
exporters: [logging]
processors: [batch]

0 comments on commit 26879ce

Please sign in to comment.