Skip to content

Operations Startup Warmup

synthaicode edited this page Nov 12, 2025 · 1 revision

Purpose: Warm up ksqlDB read paths at startup without emitting data. Safe, read-only checks to stabilize first access.

Example (WarmupStartupFillService)

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Ksql.Linq;
using Ksql.Linq.Runtime.Fill;

public sealed class WarmupStartupFillService : IStartupFillService
{
    private readonly IReadOnlyCollection<string> _tablesToProbe;
    private readonly IReadOnlyCollection<string> _streamsToProbe;

    public WarmupStartupFillService(IEnumerable<string> tablesToProbe, IEnumerable<string>? streamsToProbe = null)
    {
        _tablesToProbe = new List<string>(tablesToProbe);
        _streamsToProbe = new List<string>(streamsToProbe ?? Array.Empty<string>());
    }

    public async Task RunAsync(KsqlContext context, CancellationToken ct)
    {
        var log = context.Logger;

        // Reachability
        try
        {
            var show = await context.ExecuteStatementAsync("SHOW STREAMS;").ConfigureAwait(false);
            if (!show.IsSuccess) log?.LogWarning("Warmup: SHOW STREAMS failed: {Msg}", show.Message);
            else log?.LogInformation("Warmup: ksqlDB reachable.");
        }
        catch (Exception ex) { log?.LogWarning(ex, "Warmup: reachability failed (continuing)"); }

        // Tables (pull)
        foreach (var t in _tablesToProbe)
        {
            ct.ThrowIfCancellationRequested();
            try
            {
                var count = await context.PullCountAsync(t, limit: null, timeout: TimeSpan.FromSeconds(5)).ConfigureAwait(false);
                log?.LogDebug("Warmup pull-count {Table}: {Count}", t, count);
            }
            catch (Exception ex) { log?.LogDebug(ex, "Warmup pull-count {Table} failed", t); }
        }

        // Streams (push, LIMIT 1)
        foreach (var s in _streamsToProbe)
        {
            ct.ThrowIfCancellationRequested();
            try
            {
                var sql = $"SELECT * FROM {s} EMIT CHANGES LIMIT 1;";
                var count = await context.QueryStreamCountAsync(sql, timeout: TimeSpan.FromSeconds(10)).ConfigureAwait(false);
                log?.LogDebug("Warmup stream-count {Stream}: {Count}", s, count);
            }
            catch (Exception ex) { log?.LogDebug(ex, "Warmup stream-count {Stream} failed", s); }
        }

        log?.LogInformation("Warmup finished (read-only).");
    }
}

How to use

  • Minimal run: see repository examples/startup-warmup (Program.cs + README).
  • Integrate in your host startup if needed. It respects cancellation and logs at Information/Debug.

Notes

  • Aligns with project policy: no synthetic records are emitted during startup.
  • Use small timeouts (5–10s) and proceed even on partial failures.
  • For first-run environments, ensure Phase 1–3 DDL are in place (see Operations-Startup).

Clone this wiki locally