Skip to content

Tumbling Consumption

synthaicode edited this page Oct 25, 2025 · 3 revisions

Purpose: How to consume Tumbling outputs (live and base) and handle consumer behavior.

Live consumption (*_live)

await ctx.Set<Bar1m>().ForEachAsync(bar =>
{
  Console.WriteLine($"{bar.Symbol}@{bar.BucketStart:o}: {bar.Close}");
  return Task.CompletedTask;
});

Recommended settings

  • Consumer default AutoOffsetReset = Latest (follow new data)

Base reads (historical, point‑in‑time)

var rows1m = await TimeBucket.ReadAsync<Bar1m>(
  ctx, Period.Minutes(1), new[] { broker, symbol }, CancellationToken.None);

Recommended settings

  • For backfill/verification, set consumer AutoOffsetReset = Earliest
  • Optionally use rows_last (latest snapshot from 1s rows) to confirm presence

Related: Tumbling-Topics-Config, API-Reference

Consumer group and rebalancing

  • Use one consumer group per live stream to control partitions; rebalances can briefly pause processing.
  • Choose a stable GroupId; align partitions with expected parallelism.
  • Keep handlers bounded; push heavy work off the hot path.

Commit strategy (autoCommit vs manual)

Auto commit is simplest; for precise control use manual commit.

Auto commit (simple)

await ctx.Set<Bar1m>().ForEachAsync(bar =>
{
  Process(bar);
  return Task.CompletedTask;
});

Manual commit (ensure completion before acknowledging)

await ctx.Set<Bar1m>().ForEachAsync((bar, headers, meta) =>
{
  ProcessWithRetry(bar);
  ctx.Set<Bar1m>().Commit(bar);
  return Task.CompletedTask;
}, autoCommit: false);

Hints

  • Keys (partitioning): use stable key parts; RocksDB materializes queryable state for tables/time buckets.
  • Headers: include correlation ids to trace issues during reprocessing.

1s hub pattern (<base>_1s_rows)

Purpose: monitor latest ticks and feed quick downstream live outputs.

Example (logic‑only sketch)

await ctx.Set<Tick1s>().ForEachAsync(async (tick, headers, meta) =>
{
  await cache.SetAsync((tick.Broker, tick.Symbol), tick, ttl: TimeSpan.FromMinutes(10));

  await ctx.Set<NowPriceLive>().AddAsync(new NowPriceLive
  {
    Broker = tick.Broker,
    Symbol = tick.Symbol,
    Price = tick.Bid,
    At = tick.TimestampUtc
  });
});

Notes

  • 1s rows are usually short‑retained and support 1m/5m/1h base computations.
  • Design hubs to survive restarts; rely on idempotency and warm‑up flows.
Clone this wiki locally