Skip to content

Commit

Permalink
Client keep alive support (#138)
Browse files Browse the repository at this point in the history
Fixes #125
  • Loading branch information
cretz committed Oct 9, 2023
1 parent 516c72b commit ab4ded4
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 6 deletions.
12 changes: 12 additions & 0 deletions src/Temporalio/Bridge/Interop/Interop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ internal partial struct ClientRetryOptions
public UIntPtr max_retries;
}

internal partial struct ClientKeepAliveOptions
{
[NativeTypeName("uint64_t")]
public ulong interval_millis;

[NativeTypeName("uint64_t")]
public ulong timeout_millis;
}

internal unsafe partial struct ClientOptions
{
[NativeTypeName("struct ByteArrayRef")]
Expand All @@ -137,6 +146,9 @@ internal unsafe partial struct ClientOptions

[NativeTypeName("const struct ClientRetryOptions *")]
public ClientRetryOptions* retry_options;

[NativeTypeName("const struct ClientKeepAliveOptions *")]
public ClientKeepAliveOptions* keep_alive_options;
}

internal unsafe partial struct ByteArray
Expand Down
17 changes: 17 additions & 0 deletions src/Temporalio/Bridge/OptionsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ internal static class OptionsExtensions
options.RpcRetry == null
? null
: scope.Pointer(options.RpcRetry.ToInteropOptions()),
keep_alive_options =
options.KeepAlive == null
? null
: scope.Pointer(options.KeepAlive.ToInteropOptions()),
};
}

Expand Down Expand Up @@ -285,6 +289,19 @@ internal static class OptionsExtensions
};
}

/// <summary>
/// Convert keep alive options.
/// </summary>
/// <param name="options">Options to convert.</param>
/// <returns>Converted options.</returns>
public static Interop.ClientKeepAliveOptions ToInteropOptions(
this Temporalio.Client.KeepAliveOptions options) =>
new()
{
interval_millis = (ulong)options.Interval.TotalMilliseconds,
timeout_millis = (ulong)options.Timeout.TotalMilliseconds,
};

/// <summary>
/// Convert start local options options.
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions src/Temporalio/Bridge/include/temporal-sdk-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ typedef struct ClientRetryOptions {
uintptr_t max_retries;
} ClientRetryOptions;

typedef struct ClientKeepAliveOptions {
uint64_t interval_millis;
uint64_t timeout_millis;
} ClientKeepAliveOptions;

typedef struct ClientOptions {
struct ByteArrayRef target_url;
struct ByteArrayRef client_name;
Expand All @@ -85,6 +90,7 @@ typedef struct ClientOptions {
struct ByteArrayRef identity;
const struct ClientTlsOptions *tls_options;
const struct ClientRetryOptions *retry_options;
const struct ClientKeepAliveOptions *keep_alive_options;
} ClientOptions;

typedef struct ByteArray {
Expand Down
25 changes: 21 additions & 4 deletions src/Temporalio/Bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use temporal_client::{
ClientOptions as CoreClientOptions, ClientOptionsBuilder, ClientTlsConfig, ConfiguredClient,
HealthService, OperatorService, RetryClient, RetryConfig, TemporalServiceClientWithMetrics,
TestService, TlsConfig, WorkflowService,
ClientKeepAliveConfig, ClientOptions as CoreClientOptions, ClientOptionsBuilder,
ClientTlsConfig, ConfiguredClient, HealthService, OperatorService, RetryClient, RetryConfig,
TemporalServiceClientWithMetrics, TestService, TlsConfig, WorkflowService,
};
use tonic::metadata::MetadataKey;
use url::Url;
Expand All @@ -26,6 +26,7 @@ pub struct ClientOptions {
identity: ByteArrayRef,
tls_options: *const ClientTlsOptions,
retry_options: *const ClientRetryOptions,
keep_alive_options: *const ClientKeepAliveOptions,
}

#[repr(C)]
Expand All @@ -46,6 +47,12 @@ pub struct ClientRetryOptions {
pub max_retries: usize,
}

#[repr(C)]
pub struct ClientKeepAliveOptions {
pub interval_millis: u64,
pub timeout_millis: u64,
}

type CoreClient = RetryClient<ConfiguredClient<TemporalServiceClientWithMetrics>>;

pub struct Client {
Expand Down Expand Up @@ -425,7 +432,8 @@ impl TryFrom<&ClientOptions> for CoreClientOptions {
.identity(opts.identity.to_string())
.retry_config(
unsafe { opts.retry_options.as_ref() }.map_or(RetryConfig::default(), |c| c.into()),
);
)
.keep_alive(unsafe { opts.keep_alive_options.as_ref() }.map(Into::into));
if let Some(tls_config) = unsafe { opts.tls_options.as_ref() } {
opts_builder.tls_cfg(tls_config.try_into()?);
}
Expand Down Expand Up @@ -475,3 +483,12 @@ impl From<&ClientRetryOptions> for RetryConfig {
}
}
}

impl From<&ClientKeepAliveOptions> for ClientKeepAliveConfig {
fn from(opts: &ClientKeepAliveOptions) -> Self {
ClientKeepAliveConfig {
interval: Duration::from_millis(opts.interval_millis),
timeout: Duration::from_millis(opts.timeout_millis),
}
}
}
8 changes: 6 additions & 2 deletions src/Temporalio/Bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ impl Runtime {
if let Some(v) = unsafe { options.telemetry.as_ref() } {
if let Some(v) = unsafe { v.metrics.as_ref() } {
let _guard = core.tokio_handle().enter();
core.telemetry_mut().attach_late_init_metrics(create_meter(v, custom_meter)?);
core.telemetry_mut()
.attach_late_init_metrics(create_meter(v, custom_meter)?);
}
}
Ok(Runtime {
Expand Down Expand Up @@ -223,7 +224,10 @@ impl TryFrom<&TelemetryOptions> for CoreTelemetryOptions {
}
}

fn create_meter(options: &MetricsOptions, custom_meter: Option<CustomMetricMeterRef>) -> anyhow::Result<Arc<dyn CoreMeter>> {
fn create_meter(
options: &MetricsOptions,
custom_meter: Option<CustomMetricMeterRef>,
) -> anyhow::Result<Arc<dyn CoreMeter>> {
// OTel, Prom, or custom
if let Some(otel_options) = unsafe { options.opentelemetry.as_ref() } {
if !options.prometheus.is_null() || custom_meter.is_some() {
Expand Down
27 changes: 27 additions & 0 deletions src/Temporalio/Client/KeepAliveOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;

namespace Temporalio.Client
{
/// <summary>
/// Keep alive options for Temporal connections.
/// </summary>
public class KeepAliveOptions : ICloneable
{
/// <summary>
/// Gets or sets the interval to send HTTP2 keep alive pings.
/// </summary>
public TimeSpan Interval { get; set; } = TimeSpan.FromSeconds(30);

/// <summary>
/// Gets or sets the timeout that the keep alive must be responded to within or the
/// connection will be closed.
/// </summary>
public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(15);

/// <summary>
/// Create a shallow copy of these options.
/// </summary>
/// <returns>A shallow copy of these options.</returns>
public virtual object Clone() => MemberwiseClone();
}
}
12 changes: 12 additions & 0 deletions src/Temporalio/Client/TemporalConnectionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ public TemporalConnectionOptions()
/// </remarks>
public RpcRetryOptions? RpcRetry { get; set; }

/// <summary>
/// Gets or sets keep alive options for this connection.
/// </summary>
/// <remarks>
/// Default enabled, set to null to disable.
/// </remarks>
public KeepAliveOptions? KeepAlive { get; set; } = new();

/// <summary>
/// Gets or sets the gRPC metadata for all calls (i.e. the headers).
/// </summary>
Expand Down Expand Up @@ -95,6 +103,10 @@ public virtual object Clone()
{
copy.RpcRetry = (RpcRetryOptions)RpcRetry.Clone();
}
if (KeepAlive != null)
{
copy.KeepAlive = (KeepAliveOptions)KeepAlive.Clone();
}
return copy;
}

Expand Down

0 comments on commit ab4ded4

Please sign in to comment.