Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions packages/interceptors-opentelemetry/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
"@temporalio/workflow": "file:../workflow"
},
"peerDependencies": {
"@temporalio/activity": "file:../activity",
"@temporalio/client": "file:../client",
"@temporalio/common": "file:../common",
"@temporalio/worker": "file:../worker",
"@temporalio/workflow": "file:../workflow"
},
"peerDependenciesMeta": {
"@temporalio/workflow": {
"optional": true
}
},
"bugs": {
"url": "https://github.com/temporalio/sdk-typescript/issues"
},
Expand Down
2 changes: 1 addition & 1 deletion packages/interceptors-opentelemetry/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as otel from '@opentelemetry/api';
import { Next, WorkflowClientInterceptor, WorkflowSignalInput, WorkflowStartInput } from '@temporalio/client';
import type { Next, WorkflowSignalInput, WorkflowStartInput, WorkflowClientInterceptor } from '@temporalio/client';
import { instrument, headersWithContext, RUN_ID_ATTR_KEY } from '../instrumentation';
import { SpanName, SPAN_DELIMITER } from '../workflow';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
* @module
*/
import * as otel from '@opentelemetry/api';
import { ApplicationFailure, ApplicationFailureCategory, Headers, defaultPayloadConverter } from '@temporalio/common';
import {
type Headers,
ApplicationFailure,
ApplicationFailureCategory,
defaultPayloadConverter,
} from '@temporalio/common';

/** Default trace header for opentelemetry interceptors */
export const TRACE_HEADER = '_tracer-data';
Expand Down
16 changes: 8 additions & 8 deletions packages/interceptors-opentelemetry/src/worker/index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import * as otel from '@opentelemetry/api';
import { Resource } from '@opentelemetry/resources';
import { ReadableSpan, SpanExporter } from '@opentelemetry/sdk-trace-base';
import { Context as ActivityContext } from '@temporalio/activity';
import {
ActivityExecuteInput,
import type { Resource } from '@opentelemetry/resources';
import type { ReadableSpan, SpanExporter } from '@opentelemetry/sdk-trace-base';
import type { Context as ActivityContext } from '@temporalio/activity';
import type {
Next,
ActivityInboundCallsInterceptor,
ActivityOutboundCallsInterceptor,
GetLogAttributesInput,
InjectedSink,
Next,
GetLogAttributesInput,
ActivityExecuteInput,
} from '@temporalio/worker';
import { instrument, extractContextFromHeaders } from '../instrumentation';
import { OpenTelemetryWorkflowExporter, SerializableSpan, SpanName, SPAN_DELIMITER } from '../workflow';
import { type OpenTelemetryWorkflowExporter, type SerializableSpan, SpanName, SPAN_DELIMITER } from '../workflow';

export interface InterceptorOptions {
readonly tracer?: otel.Tracer;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
import * as otel from '@opentelemetry/api';
import { AsyncLocalStorage } from '@temporalio/workflow';
import { ensureWorkflowModuleLoaded, getWorkflowModuleIfAvailable } from './workflow-module-loader';

const AsyncLocalStorage = getWorkflowModuleIfAvailable()?.AsyncLocalStorage;

export class ContextManager implements otel.ContextManager {
protected storage = new AsyncLocalStorage<otel.Context>();
// If `@temporalio/workflow` is not available, ignore for now.
// When ContextManager is constructed module resolution error will be thrown.
protected storage = AsyncLocalStorage ? new AsyncLocalStorage<otel.Context>() : undefined;

public constructor() {
ensureWorkflowModuleLoaded();
}

active(): otel.Context {
return this.storage.getStore() || otel.ROOT_CONTEXT;
return this.storage!.getStore() || otel.ROOT_CONTEXT;
}

bind<T>(context: otel.Context, target: T): T {
Expand Down Expand Up @@ -36,7 +44,7 @@ export class ContextManager implements otel.ContextManager {
}

disable(): this {
this.storage.disable();
this.storage!.disable();
return this;
}

Expand All @@ -47,6 +55,6 @@ export class ContextManager implements otel.ContextManager {
...args: A
): ReturnType<F> {
const cb = thisArg == null ? fn : fn.bind(thisArg);
return this.storage.run(context, cb, ...args);
return this.storage!.run(context, cb, ...args);
}
}
19 changes: 16 additions & 3 deletions packages/interceptors-opentelemetry/src/workflow/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
import './runtime'; // Patch the Workflow isolate runtime for opentelemetry
import * as otel from '@opentelemetry/api';
import * as tracing from '@opentelemetry/sdk-trace-base';
import {
import type {
ActivityInput,
ContinueAsNew,
ContinueAsNewInput,
DisposeInput,
GetLogAttributesInput,
Expand All @@ -16,14 +15,14 @@ import {
StartChildWorkflowExecutionInput,
WorkflowExecuteInput,
WorkflowInboundCallsInterceptor,
workflowInfo,
WorkflowInternalsInterceptor,
WorkflowOutboundCallsInterceptor,
} from '@temporalio/workflow';
import { instrument, extractContextFromHeaders, headersWithContext } from '../instrumentation';
import { ContextManager } from './context-manager';
import { SpanName, SPAN_DELIMITER } from './definitions';
import { SpanExporter } from './span-exporter';
import { ensureWorkflowModuleLoaded, getWorkflowModule } from './workflow-module-loader';

export * from './definitions';

Expand All @@ -48,14 +47,21 @@ function getTracer(): otel.Tracer {
*
* Wraps the operation in an opentelemetry Span and links it to a parent Span context if one is
* provided in the Workflow input headers.
*
* `@temporalio/workflow` must be provided by host package in order to function.
*/
export class OpenTelemetryInboundInterceptor implements WorkflowInboundCallsInterceptor {
protected readonly tracer = getTracer();

public constructor() {
ensureWorkflowModuleLoaded();
}

public async execute(
input: WorkflowExecuteInput,
next: Next<WorkflowInboundCallsInterceptor, 'execute'>
): Promise<unknown> {
const { workflowInfo, ContinueAsNew } = getWorkflowModule();
const context = await extractContextFromHeaders(input.headers);
return await instrument({
tracer: this.tracer,
Expand Down Expand Up @@ -84,10 +90,16 @@ export class OpenTelemetryInboundInterceptor implements WorkflowInboundCallsInte
* Intercepts outbound calls to schedule an Activity
*
* Wraps the operation in an opentelemetry Span and passes it to the Activity via headers.
*
* `@temporalio/workflow` must be provided by host package in order to function.
*/
export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsInterceptor {
protected readonly tracer = getTracer();

public constructor() {
ensureWorkflowModuleLoaded();
}

public async scheduleActivity(
input: ActivityInput,
next: Next<WorkflowOutboundCallsInterceptor, 'scheduleActivity'>
Expand Down Expand Up @@ -143,6 +155,7 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn
input: ContinueAsNewInput,
next: Next<WorkflowOutboundCallsInterceptor, 'continueAsNew'>
): Promise<never> {
const { ContinueAsNew } = getWorkflowModule();
return await instrument({
tracer: this.tracer,
spanName: `${SpanName.CONTINUE_AS_NEW}${SPAN_DELIMITER}${input.options.workflowType}`,
Expand Down
6 changes: 4 additions & 2 deletions packages/interceptors-opentelemetry/src/workflow/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
* Sets global variables required for importing opentelemetry in isolate
* @module
*/
import { inWorkflowContext } from '@temporalio/workflow';
import { getWorkflowModuleIfAvailable } from './workflow-module-loader';

if (inWorkflowContext()) {
const inWorkflowContext = getWorkflowModuleIfAvailable()?.inWorkflowContext;

if (inWorkflowContext?.()) {
// Required by opentelemetry (pretend to be a browser)
Object.assign(globalThis, {
performance: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import * as tracing from '@opentelemetry/sdk-trace-base';
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
import * as wf from '@temporalio/workflow';
import { OpenTelemetrySinks, SerializableSpan } from './definitions';
import { ensureWorkflowModuleLoaded, getWorkflowModuleIfAvailable } from './workflow-module-loader';

const { exporter } = wf.proxySinks<OpenTelemetrySinks>();
const exporter = getWorkflowModuleIfAvailable()?.proxySinks<OpenTelemetrySinks>()?.exporter;

export class SpanExporter implements tracing.SpanExporter {
public constructor() {
ensureWorkflowModuleLoaded();
}

public export(spans: tracing.ReadableSpan[], resultCallback: (result: ExportResult) => void): void {
exporter.export(spans.map((span) => this.makeSerializable(span)));
exporter!.export(spans.map((span) => this.makeSerializable(span)));
resultCallback({ code: ExportResultCode.SUCCESS });
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Utilities for working with a possibly missing `@temporalio/workflow` peer dependency
* @module
*/
import type * as WorkflowModule from '@temporalio/workflow';

// @temporalio/workflow is an optional peer dependency.
// It can be missing as long as the user isn't attempting to construct a workflow interceptor.
// If we start shipping ES modules alongside CJS, we will have to reconsider
// this dynamic import as `import` is async for ES modules.
let workflowModule: typeof WorkflowModule | undefined;
let workflowModuleLoadError: any | undefined;

try {
// eslint-disable-next-line @typescript-eslint/no-require-imports
workflowModule = require('@temporalio/workflow');
} catch (err) {
// Capture the module not found error to rethrow if the module is required
workflowModuleLoadError = err;
}

/**
* Returns `@temporalio/workflow` module if present.
* Throws if the module failed to load
*/
export function getWorkflowModule(): typeof WorkflowModule {
if (workflowModuleLoadError) {
throw workflowModuleLoadError;
}
return workflowModule!;
}

/**
* Checks if the workflow module loaded successfully and throws if not.
*/
export function ensureWorkflowModuleLoaded(): void {
if (workflowModuleLoadError) {
throw workflowModuleLoadError;
}
}

/**
* Returns the workflow module if available, or undefined if it failed to load.
*/
export function getWorkflowModuleIfAvailable(): typeof WorkflowModule | undefined {
return workflowModule;
}