Skip to content

Examples Custom Executor

synthaicode edited this page Nov 12, 2025 · 1 revision

Purpose: Implement a custom IKsqlExecutor for logging/retry/policy without touching call sites.

Why

  • Centralize retries/backoff/circuit-breaker around KSQL calls.
  • Inject structured logs/metrics/correlation IDs.

Core idea

Wrap the default executor and add policies:

public sealed class LoggingRetryExecutor : IKsqlExecutor
{
    private readonly IKsqlExecutor _inner;
    public LoggingRetryExecutor(IKsqlDbClient client) => _inner = new KsqlExecutor(client);
    public async Task<KsqlDbResponse> ExecuteStatementAsync(string statement, CancellationToken ct = default)
    {
        Console.WriteLine($"[exec] {statement}");
        for (var attempt = 1; ; attempt++)
        {
            try { return await _inner.ExecuteStatementAsync(statement, ct); }
            catch when (attempt < 3) { await Task.Delay(TimeSpan.FromMilliseconds(200*attempt), ct); }
        }
    }
    public Task<KsqlDbResponse> ExecuteExplainAsync(string ksql, CancellationToken ct = default)
        => ExecuteStatementAsync($"EXPLAIN {ksql}", ct);
}

Run (repo example)

  • cd examples/custom-executor
  • dotnet run
  • Output shows retries/log lines and a stubbed KsqlDbResponse

Note: At present, injecting a custom executor into KsqlContext is internal. The example demonstrates policy composition directly with a stub IKsqlDbClient.

Clone this wiki locally